package com.atguigu.chapter11;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/7/24 10:03
 */
public class Flink05_Sql_BaseUse {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
    
        DataStreamSource<WaterSensor> stream = env.fromElements(
            new WaterSensor("sensor_1", 1L, 10),
            new WaterSensor("sensor_1", 2L, 20),
            new WaterSensor("sensor_2", 3L, 30),
            new WaterSensor("sensor_1", 4L, 40),
            new WaterSensor("sensor_1", 5L, 50)
        );
        
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
        Table table = tenv.fromDataStream(stream);
        
        //tenv.executeSql() // 执行ddl和非查询的dml(增删改)
        
        // 查询未注册的表
        //Table t1 = tenv.sqlQuery("select * from " + table);
        // 查询已注册的表
        tenv.createTemporaryView("sensor", table);
        tenv.sqlQuery("select * from sensor").execute().print();
    
    }
}
